{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pyspark Random Forest Regression"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1. Set up spark context and SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession \\\n",
    "    .builder \\\n",
    "    .appName(\"Python Spark Random Forest Regression\") \\\n",
    "    .config(\"spark.some.config.option\", \"some-value\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2. load dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df = spark.read.format('com.databricks.spark.csv').\\\n",
    "                               options(header='true', \\\n",
    "                               inferschema='true').load(\"./data/WineData.csv\",header=True);"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- fixed acidity: double (nullable = true)\n",
      " |-- volatile acidity: double (nullable = true)\n",
      " |-- citric acid: double (nullable = true)\n",
      " |-- residual sugar: double (nullable = true)\n",
      " |-- chlorides: double (nullable = true)\n",
      " |-- free sulfur dioxide: double (nullable = true)\n",
      " |-- total sulfur dioxide: double (nullable = true)\n",
      " |-- density: double (nullable = true)\n",
      " |-- pH: double (nullable = true)\n",
      " |-- sulphates: double (nullable = true)\n",
      " |-- alcohol: double (nullable = true)\n",
      " |-- quality: integer (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# convert the data to dense vector\n",
    "#def transData(row):\n",
    "#    return Row(label=row[\"quality\"],\n",
    "#               features=Vectors.dense([row[\"fixed acidity\"],\n",
    "#                                       row[\"volatile acidity\"],\n",
    "#                                       row[\"citric acid\"],\n",
    "#                                       row[\"residual sugar\"],\n",
    "#                                       row[\"chlorides\"], \n",
    "#                                       row[\"free sulfur dioxide\"],\n",
    "#                                       row[\"total sulfur dioxide\"],\n",
    "#                                       row[\"residual sugar\"],\n",
    "#                                       row[\"density\"],                                          \n",
    "#                                       row[\"pH\"],\n",
    "#                                       row[\"sulphates\"],\n",
    "#                                       row[\"alcohol\"]\n",
    "#                                      ]))\n",
    "def transData(data):\n",
    "    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.regression import RandomForestRegressor\n",
    "from pyspark.ml.feature import VectorIndexer\n",
    "from pyspark.ml.evaluation import RegressionEvaluator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+\n",
      "|            features|label|\n",
      "+--------------------+-----+\n",
      "|[7.4,0.7,0.0,1.9,...|    5|\n",
      "|[7.8,0.88,0.0,2.6...|    5|\n",
      "|[7.8,0.76,0.04,2....|    5|\n",
      "|[11.2,0.28,0.56,1...|    6|\n",
      "|[7.4,0.7,0.0,1.9,...|    5|\n",
      "|[7.4,0.66,0.0,1.8...|    5|\n",
      "+--------------------+-----+\n",
      "only showing top 6 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import Row\n",
    "from pyspark.ml.linalg import Vectors\n",
    "\n",
    "#transformed = df.rdd.map(transData).toDF() \n",
    "transformed= transData(df)\n",
    "transformed.show(6)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Split the data into training and test sets (30% held out for testing)\n",
    "(trainingData, testData) = transformed.randomSplit([0.7, 0.3])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Train a RandomForest model.\n",
    "rf = RandomForestRegressor()\n",
    "model = rf.fit(trainingData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "20"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model.getNumTrees"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Make predictions.\n",
    "predictions = model.transform(testData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+------------------+\n",
      "|            features|label|        prediction|\n",
      "+--------------------+-----+------------------+\n",
      "|[4.9,0.42,0.0,2.1...|    7| 6.489667556875804|\n",
      "|[5.1,0.42,0.0,1.8...|    7| 6.267301910170284|\n",
      "|[5.1,0.585,0.0,1....|    7|6.0526786505470245|\n",
      "|[5.2,0.32,0.25,1....|    5| 5.257985010985523|\n",
      "|[5.2,0.48,0.04,1....|    7| 5.943264423589821|\n",
      "|[5.2,0.645,0.0,2....|    6| 5.909094836353475|\n",
      "|[5.3,0.47,0.11,2....|    7|  6.41491729478567|\n",
      "|[5.4,0.74,0.0,1.2...|    6| 5.855394156577842|\n",
      "|[5.4,0.835,0.08,1...|    7| 5.989289110209313|\n",
      "|[5.5,0.49,0.03,1....|    8| 6.245232161181737|\n",
      "+--------------------+-----+------------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "predictions.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+-----+--------------------+\n",
      "|        prediction|label|            features|\n",
      "+------------------+-----+--------------------+\n",
      "| 6.489667556875804|    7|[4.9,0.42,0.0,2.1...|\n",
      "| 6.267301910170284|    7|[5.1,0.42,0.0,1.8...|\n",
      "|6.0526786505470245|    7|[5.1,0.585,0.0,1....|\n",
      "| 5.257985010985523|    5|[5.2,0.32,0.25,1....|\n",
      "| 5.943264423589821|    7|[5.2,0.48,0.04,1....|\n",
      "+------------------+-----+--------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Select example rows to display.\n",
    "predictions.select(\"prediction\", \"label\", \"features\").show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Root Mean Squared Error (RMSE) on test data = 0.659148\n"
     ]
    }
   ],
   "source": [
    "# Select (prediction, true label) and compute test error\n",
    "evaluator = RegressionEvaluator(\n",
    "    labelCol=\"label\", predictionCol=\"prediction\", metricName=\"rmse\")\n",
    "rmse = evaluator.evaluate(predictions)\n",
    "print(\"Root Mean Squared Error (RMSE) on test data = %g\" % rmse)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pyspark Random Forest Classifier"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.classification import RandomForestClassifier\n",
    "from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df = spark.read.format('com.databricks.spark.csv').\\\n",
    "                               options(header='true', \\\n",
    "                               inferschema='true').load(\"./data/WineData.csv\",header=True);"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- fixed acidity: double (nullable = true)\n",
      " |-- volatile acidity: double (nullable = true)\n",
      " |-- citric acid: double (nullable = true)\n",
      " |-- residual sugar: double (nullable = true)\n",
      " |-- chlorides: double (nullable = true)\n",
      " |-- free sulfur dioxide: double (nullable = true)\n",
      " |-- total sulfur dioxide: double (nullable = true)\n",
      " |-- density: double (nullable = true)\n",
      " |-- pH: double (nullable = true)\n",
      " |-- sulphates: double (nullable = true)\n",
      " |-- alcohol: double (nullable = true)\n",
      " |-- quality: integer (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# convert the data to dense vector\n",
    "def transData(data):\n",
    "    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+\n",
      "|            features|label|\n",
      "+--------------------+-----+\n",
      "|[7.4,0.7,0.0,1.9,...|    5|\n",
      "|[7.8,0.88,0.0,2.6...|    5|\n",
      "|[7.8,0.76,0.04,2....|    5|\n",
      "|[11.2,0.28,0.56,1...|    6|\n",
      "|[7.4,0.7,0.0,1.9,...|    5|\n",
      "|[7.4,0.66,0.0,1.8...|    5|\n",
      "+--------------------+-----+\n",
      "only showing top 6 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import Row\n",
    "from pyspark.ml.linalg import Vectors\n",
    "\n",
    "data= transData(df)\n",
    "data.show(6)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+------------+\n",
      "|            features|label|indexedLabel|\n",
      "+--------------------+-----+------------+\n",
      "|[7.4,0.7,0.0,1.9,...|    5|         0.0|\n",
      "|[7.8,0.88,0.0,2.6...|    5|         0.0|\n",
      "|[7.8,0.76,0.04,2....|    5|         0.0|\n",
      "|[11.2,0.28,0.56,1...|    6|         1.0|\n",
      "|[7.4,0.7,0.0,1.9,...|    5|         0.0|\n",
      "|[7.4,0.66,0.0,1.8...|    5|         0.0|\n",
      "+--------------------+-----+------------+\n",
      "only showing top 6 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Index labels, adding metadata to the label column.\n",
    "# Fit on whole dataset to include all labels in index.\n",
    "labelIndexer = StringIndexer(inputCol=\"label\", outputCol=\"indexedLabel\").fit(data)\n",
    "labelIndexer.transform(data).show(6)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+--------------------+\n",
      "|            features|label|     indexedFeatures|\n",
      "+--------------------+-----+--------------------+\n",
      "|[7.4,0.7,0.0,1.9,...|    5|[7.4,0.7,0.0,1.9,...|\n",
      "|[7.8,0.88,0.0,2.6...|    5|[7.8,0.88,0.0,2.6...|\n",
      "|[7.8,0.76,0.04,2....|    5|[7.8,0.76,0.04,2....|\n",
      "|[11.2,0.28,0.56,1...|    6|[11.2,0.28,0.56,1...|\n",
      "|[7.4,0.7,0.0,1.9,...|    5|[7.4,0.7,0.0,1.9,...|\n",
      "|[7.4,0.66,0.0,1.8...|    5|[7.4,0.66,0.0,1.8...|\n",
      "+--------------------+-----+--------------------+\n",
      "only showing top 6 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Automatically identify categorical features, and index them.\n",
    "# Set maxCategories so features with > 4 distinct values are treated as continuous.\n",
    "featureIndexer =VectorIndexer(inputCol=\"features\", \\\n",
    "                              outputCol=\"indexedFeatures\", \\\n",
    "                              maxCategories=4).fit(data)\n",
    "\n",
    "featureIndexer.transform(data).show(6)   "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Split the data into training and test sets (30% held out for testing)\n",
    "(trainingData, testData) = transformed.randomSplit([0.7, 0.3])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Train a RandomForest model.\n",
    "rf = RandomForestClassifier(labelCol=\"indexedLabel\", featuresCol=\"indexedFeatures\", numTrees=10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Convert indexed labels back to original labels.\n",
    "labelConverter = IndexToString(inputCol=\"prediction\", outputCol=\"predictedLabel\",\n",
    "                               labels=labelIndexer.labels)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Chain indexers and forest in a Pipeline\n",
    "pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Train model.  This also runs the indexers.\n",
    "model = pipeline.fit(trainingData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Make predictions.\n",
    "predictions = model.transform(testData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+--------------+\n",
      "|            features|label|predictedLabel|\n",
      "+--------------------+-----+--------------+\n",
      "|[4.7,0.6,0.17,2.3...|    6|             5|\n",
      "|[5.0,1.02,0.04,1....|    4|             5|\n",
      "|[5.1,0.42,0.0,1.8...|    7|             6|\n",
      "|[5.1,0.47,0.02,1....|    6|             6|\n",
      "|[5.2,0.32,0.25,1....|    5|             5|\n",
      "+--------------------+-----+--------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Select example rows to display.\n",
    "predictions.select(\"features\",\"label\",\"predictedLabel\").show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Test Error = 0.402083\n"
     ]
    }
   ],
   "source": [
    "# Select (prediction, true label) and compute test error\n",
    "evaluator = MulticlassClassificationEvaluator(\n",
    "    labelCol=\"indexedLabel\", predictionCol=\"prediction\", metricName=\"accuracy\")\n",
    "accuracy = evaluator.evaluate(predictions)\n",
    "print(\"Test Error = %g\" % (1.0 - accuracy))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "RandomForestClassificationModel (uid=rfc_d3482d5f110a) with 10 trees"
      ]
     },
     "execution_count": 45,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rfModel = model.stages[2]\n",
    "rfModel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[DecisionTreeClassificationModel (uid=dtc_83a486d1ea3f) of depth 5 with 59 nodes,\n",
       " DecisionTreeClassificationModel (uid=dtc_ecc86f56fd85) of depth 5 with 57 nodes,\n",
       " DecisionTreeClassificationModel (uid=dtc_c808860908b2) of depth 5 with 53 nodes,\n",
       " DecisionTreeClassificationModel (uid=dtc_9cc34e2a2a6f) of depth 5 with 61 nodes,\n",
       " DecisionTreeClassificationModel (uid=dtc_67e03b4e5ce4) of depth 5 with 61 nodes,\n",
       " DecisionTreeClassificationModel (uid=dtc_1d972cb63e4e) of depth 5 with 55 nodes,\n",
       " DecisionTreeClassificationModel (uid=dtc_8d0e0c19f6fa) of depth 5 with 59 nodes,\n",
       " DecisionTreeClassificationModel (uid=dtc_650db857e494) of depth 5 with 61 nodes,\n",
       " DecisionTreeClassificationModel (uid=dtc_cc2e16db50a1) of depth 5 with 61 nodes,\n",
       " DecisionTreeClassificationModel (uid=dtc_14ac6d89880a) of depth 5 with 59 nodes]"
      ]
     },
     "execution_count": 46,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rfModel.trees"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [conda root]",
   "language": "python",
   "name": "conda-root-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
